# Part of Odoo. See LICENSE file for full copyright and licensing details.

import logging
from odoo.addons.base.tests.common import HttpCaseWithUserPortal, HttpCaseWithUserDemo

from contextlib import closing

from odoo.sql_db import categorize_query
from odoo.tools import mute_logger
from odoo.tests.common import tagged


_logger = logging.getLogger(__name__)


class UtilPerf(HttpCaseWithUserPortal, HttpCaseWithUserDemo):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        # remove menu containing a slug url (only website_helpdesk normally), to
        # avoid the menu cache being disabled, which would increase sql queries
        cls.env['website.menu'].search([
            ('url', '=like', '/%/%-%'),
        ]).unlink()
        # if website_livechat is installed before another module, the
        # get_livechat_channel_info add unrelated query for the current test.
        # So we disable it.
        if 'channel_id' in cls.env['website']:
            cls.env['website'].search([]).channel_id = False

    def _get_url_hot_query(self, url, query_list=False, nocache=False):
        """ This method returns the number of SQL Queries used inside a request.
        The returned query number will be the same as a "real" (outside of test
        mode) case: the method takes care of removing the extra queries related
        to the testing mode and to add the missing one from "real" use case.

        The goal is to ease the code reading and debugging of those perf testing
        methods, as one will have the same query count written in the test than
        it shows in "real" case logs.

        eg: if a page is taking X SQL query count to be loaded outside test mode
            in "real" case, the test is expected to also use X as query count
            value to be asserted/checked.

        :param str url: url to be checked
        :param bool query_list: whether the method should also return list of
            queries (without test cursor savepoint queries)
        :return: the query count plus the list of queries if ``query_list``
            is ``True``
        :rtype: int|tuple(int, list)
        """
        url += ('?' not in url and '?' or '')
        if nocache:
            url += '&nocache'

        # ensure worker is in hot state
        self.url_open(url)
        self.url_open(url)
        self.url_open(url)

        nested_profiler = self.profile(collectors=['sql'], db=False)
        with nested_profiler:
            self.registry.get_sequences(self.cr)
            self.url_open(url)

        profiler = nested_profiler._profiler__
        self.assertEqual(len(profiler.sub_profilers), 1, "we expect to have only one accessed url") # if not adapt the code below
        route_profiler = profiler.sub_profilers[0]
        route_entries = route_profiler.collectors[0].entries
        entries = profiler.collectors[0].entries + route_entries
        sql_queries = [entry['full_query'].strip() for entry in entries]
        sql_count = len(sql_queries)
        if not query_list:
            return sql_count
        return sql_count, sql_queries

    def _check_url_hot_query(self, url, expected_query_count, select_tables_perf=None, insert_tables_perf=None, nocache=False):
        query_count, sql_queries = self._get_url_hot_query(url, query_list=True, nocache=nocache)

        sql_from_tables = {}
        sql_into_tables = {}

        query_separator = '\n' + '-' * 100 + '\n'
        queries = query_separator.join(sql_queries)

        for query in sql_queries:
            query_type, table = categorize_query(query)
            if query_type == 'into':
                log_target = sql_into_tables
            elif query_type == 'from':
                log_target = sql_from_tables
            else:
                _logger.warning("Query type %s for query %s is not supported by _check_url_hot_query", query_type, query)
            log_target.setdefault(table, 0)
            log_target[table] = log_target[table] + 1

        if not select_tables_perf:
            select_tables_perf = {}
        select = {}
        for key in (set(sql_from_tables) | set(select_tables_perf)):
            value = sql_from_tables.get(key, 0) - select_tables_perf.get(key, 0)
            if value:
                select[key] = value

        if not insert_tables_perf:
            insert_tables_perf = {}
        insert = {}
        for key in (set(sql_into_tables) | set(insert_tables_perf)):
            value = sql_into_tables.get(key, 0) - insert_tables_perf.get(key, 0)
            if value:
                insert[key] = value

        if query_count != expected_query_count:
            msq = f"Expected {expected_query_count} queries but {query_count} where ran:\nDiff of select queries: {select}\nDiff of insert queries: {insert}{query_separator}{queries}{query_separator}"
            self.fail(msq)

        self.assertDictEqual(sql_from_tables, select_tables_perf, f'Select queries does not match: {select}{query_separator}{queries}{query_separator}')
        self.assertDictEqual(sql_into_tables, insert_tables_perf, f'Insert queries does not match: {insert}{query_separator}{queries}{query_separator}')


class TestStandardPerformance(UtilPerf):

    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.env['res.users'].browse(2).image_1920 = b'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAIAAACQd1PeAAAADElEQVR4nGNgYGAAAAAEAAH2FzhVAAAAAElFTkSuQmCC'

    @mute_logger('odoo.http')
    def test_10_perf_sql_img_controller(self):
        self.authenticate('demo', 'demo')
        # not published user, get the not found image placeholder
        self.assertEqual(self.env['res.users'].sudo().browse(2).website_published, False)
        url = '/web/image/res.users/2/image_256'
        self.assertEqual(self._get_url_hot_query(url), 7)

    @mute_logger('odoo.http')
    def test_11_perf_sql_img_controller(self):
        self.authenticate('demo', 'demo')
        self.env['res.users'].sudo().browse(2).website_published = True
        url = '/web/image/res.users/2/image_256'
        select_tables_perf = {
            'orm_signaling_registry': 1,
            'res_users': 2,
            'res_partner': 1,
            'ir_attachment': 1,
        }
        self._check_url_hot_query(url, 5, select_tables_perf)

    @mute_logger('odoo.http')
    def test_20_perf_sql_img_controller_bis(self):
        url = '/web/image/website/1/favicon'
        select_tables_perf = {
            'orm_signaling_registry': 1,
            'website': 2,
            # 1. `_find_record()` performs an access right check through
            #    `exists()` which perform a request on the website.
            # 2. `_get_stream_from` ends up reading the requested record to
            #    give a name to the file (downloaded_name)
            'ir_attachment': 2,
            # 1. `_record_to_stream()` does a `search()`..
            # 2. ..followed by a `_read()`
        }
        self._check_url_hot_query(url, 5, select_tables_perf)

        self.authenticate('portal', 'portal')
        self._check_url_hot_query(url, 5, select_tables_perf)


class TestWebsitePerformanceCommon(UtilPerf):

    def setUp(self):
        super().setUp()
        self.page, self.menu = self._create_page_with_menu('/sql_page')

    def _create_page_with_menu(self, url):
        name = url[1:]
        website = self.env['website'].browse(1)
        page = self.env['website.page'].create({
            'url': url,
            'name': name,
            'type': 'qweb',
            'arch': '<t name="%s" t-name="website.page_test_%s"> \
                       <t t-call="website.layout"> \
                         <div id="wrap"><div class="oe_structure"/></div> \
                       </t> \
                     </t>' % (name, name),
            'key': 'website.page_test_%s' % name,
            'is_published': True,
            'website_id': website.id,
            'track': False,
        })
        menu = self.env['website.menu'].create({
            'name': name,
            'url': url,
            'page_id': page.id,
            'website_id': website.id,
            'parent_id': website.menu_id.id
        })
        return (page, menu)


class TestWebsitePerformance(TestWebsitePerformanceCommon):

    def test_10_perf_sql_queries_page(self):
        # standard untracked website.page
        for readonly_enabled in (True, False):
            self.env.registry.test_readonly_enabled = readonly_enabled
            self.set_registry_readonly_mode(readonly_enabled)
            with self.subTest(readonly_enabled=readonly_enabled), closing(self.env.cr.savepoint()):
                select_tables_perf = {
                    'orm_signaling_registry': 1,
                    'ir_attachment': 1,
                    # `_get_serve_attachment` dispatcher fallback
                }
                expected_query_count = 2
                self._check_url_hot_query(self.page.url, expected_query_count, select_tables_perf)
                self.assertEqual(self._get_url_hot_query(self.page.url), expected_query_count)
                self.menu.unlink()  # page being or not in menu shouldn't add queries
                self._check_url_hot_query(self.page.url, expected_query_count, select_tables_perf)
                self.assertEqual(self._get_url_hot_query(self.page.url), expected_query_count)

                select_tables_perf = {
                    'orm_signaling_registry': 1,
                    'ir_attachment': 1,
                    # `_get_serve_attachment` dispatcher fallback
                    'website_page': 2,
                    # 1. `_serve_page` search page matching URL..
                    # 2. ..then reads it (`is_visible`)
                    'website': 1,
                    # menu and layout
                    'website_menu': 1,
                    'ir_ui_view': 1,
                    'res_company': 1,
                }
                expected_query_count = 8
                self._check_url_hot_query(self.page.url, expected_query_count, select_tables_perf, nocache=True)
                self.assertEqual(self._get_url_hot_query(self.page.url, nocache=True), expected_query_count)

    def test_15_perf_sql_queries_page(self):
        # standard tracked website.page
        for readonly_enabled, cache in ((True, True), (False, True), (True, False), (False, False)):
            self.set_registry_readonly_mode(readonly_enabled)
            with self.subTest(readonly_enabled=readonly_enabled), closing(self.env.cr.savepoint()):
                select_tables_perf = {
                    'orm_signaling_registry': 1,
                    'ir_attachment': 1,
                    # `_get_serve_attachment` dispatcher fallback
                } if cache else {
                    'orm_signaling_registry': 1,
                    'ir_attachment': 1,
                    # `_get_serve_attachment` dispatcher fallback
                    'website_page': 2,
                    # 1. `_serve_page` search page matching URL..
                    # 2. ..then reads it (`is_visible`)
                    'website': 1,
                    # menu and layout
                    'website_menu': 1,
                    'ir_ui_view': 1,
                    'res_company': 1,
                }
                expected_query_count = 2 if cache else 8
                insert_tables_perf = {}
                if not readonly_enabled:
                    insert_tables_perf = {
                        'website_visitor': 1,
                        # Visitor upsert
                    }
                    expected_query_count += 1
                self.page.track = True

                self.menu.unlink()  # page being or not in menu shouldn't add queries
                self._check_url_hot_query(self.page.url, expected_query_count, select_tables_perf, insert_tables_perf, nocache=not cache)
                self.assertEqual(self._get_url_hot_query(self.page.url, nocache=not cache), expected_query_count)

    def test_20_perf_sql_queries_homepage(self):
        # homepage "/" has its own controller
        for readonly_enabled in (True, False):
            self.set_registry_readonly_mode(readonly_enabled)
            with self.subTest(readonly_enabled=readonly_enabled), closing(self.env.cr.savepoint()):
                select_tables_perf = {
                    'orm_signaling_registry': 1,
                }
                expected_query_count = 1
                insert_tables_perf = {}
                if not readonly_enabled:
                    insert_tables_perf = {
                        'website_visitor': 1,
                        # Visitor upsert
                    }
                    expected_query_count += 1
                self._check_url_hot_query('/', expected_query_count, select_tables_perf, insert_tables_perf)
                self.assertEqual(self._get_url_hot_query('/'), expected_query_count)

                select_tables_perf = {
                    'orm_signaling_registry': 1,
                    'website_menu': 1,
                    # homepage controller is prefetching all menus for perf in one go
                    'website_page': 2,
                    # 1. the menu prefetching is also prefetching all menu's pages
                    # 2. find page matching the `/` url
                    'website': 1,
                    # layout
                    'ir_ui_view': 1,
                    'res_company': 1,
                }
                expected_query_count = 7
                insert_tables_perf = {}
                if not readonly_enabled:
                    insert_tables_perf = {
                        'website_visitor': 1,
                        # Visitor upsert
                    }
                    expected_query_count += 1
                self._check_url_hot_query('/', expected_query_count, select_tables_perf, insert_tables_perf, nocache=True)
                self.assertEqual(self._get_url_hot_query('/', nocache=True), expected_query_count)

    def test_30_perf_sql_queries_page_no_layout(self):
        # untrack website.page with no call to layout templates
        self.page.arch = '<div>I am a blank page</div>'

        select_tables_perf = {
            'orm_signaling_registry': 1,
            'ir_attachment': 1,
            # `_get_serve_attachment` dispatcher fallback
        }
        self._check_url_hot_query(self.page.url, 2, select_tables_perf)
        self.assertEqual(self._get_url_hot_query(self.page.url), 2)

        select_tables_perf = {
            'orm_signaling_registry': 1,
            'ir_attachment': 1,
            # `_get_serve_attachment` dispatcher fallback
            'website_page': 1,
            # 1. `_serve_page` search page matching URL..
            # 2. ..then reads it (`is_visible`)
            'website': 1,
            # Check if website.cookies_bar is active
            'ir_ui_view': 1,
            # Check if `view.track` to track visitor or not
        }
        self._check_url_hot_query(self.page.url, 5, select_tables_perf, nocache=True)
        self.assertEqual(self._get_url_hot_query(self.page.url, nocache=True), 5)

    def test_40_perf_sql_queries_page_multi_level_menu(self):
        # menu structure should not impact SQL requests
        _, menu_a = self._create_page_with_menu('/a')
        _, menu_aa = self._create_page_with_menu('/aa')
        _, menu_b = self._create_page_with_menu('/b')
        _, menu_bb = self._create_page_with_menu('/bb')
        self._create_page_with_menu('c')
        menu_bb.parent_id = menu_b
        menu_aa.parent_id = menu_a

        select_tables_perf = {
            'orm_signaling_registry': 1,
            'ir_attachment': 1,
            # `_get_serve_attachment` dispatcher fallback
        }
        self._check_url_hot_query(self.page.url, 2, select_tables_perf)
        self.assertEqual(self._get_url_hot_query(self.page.url), 2)

        select_tables_perf = {
            'orm_signaling_registry': 1,
            'ir_attachment': 1,
            # `_get_serve_attachment` dispatcher fallback
            'website_page': 2,
            # 1. `_serve_page` search page matching URL..
            # 2. ..then reads it (`is_visible`)
            'website': 1,
            'website_menu': 1,
            'ir_ui_view': 1,
            # Check if `view.track` to track visitor or not
            # layout content (company name, logo)
            'res_company': 1,
        }
        self._check_url_hot_query(self.page.url, 8, select_tables_perf, nocache=True)
        self.assertEqual(self._get_url_hot_query(self.page.url, nocache=True), 8)

@tagged('-at_install', 'post_install')
class TestWebsitePerformancePost(UtilPerf):
    @mute_logger('odoo.http')
    def test_50_perf_sql_web_assets(self):
        # assets route /web/assets/..
        assets_url = self.env['ir.qweb']._get_asset_bundle('web.assets_frontend_lazy', css=False, js=True).get_links()[0]
        self.assertIn('web.assets_frontend_lazy.min.js', assets_url)
        select_tables_perf = {
            'orm_signaling_registry': 1,
            'ir_attachment': 2,
            # All 2 coming from the /web/assets and ir.binary stack
            # 1. `search() the attachment`
            # 2. `_record_to_stream` reads the other attachment fields
        }
        self._check_url_hot_query(assets_url, 3, select_tables_perf)
        self.assertEqual(self._get_url_hot_query(assets_url), 3)
